Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] Refactor - Move command execution responsibility to CommandHandler from IOThread #1358

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

psrvere
Copy link
Contributor

@psrvere psrvere commented Dec 4, 2024

TODO
[ ] Investigate CopyTest failure
[ ] Investigate SDK tests failure

This PR is a preparatory step towards solving nearly 50-60% CPU utilisation due to context shift between user space and kernel space for frequent read and write calls.

As first step, this PR moves command execution logic out of IOThread to a new entity CommandHandler. IOThread is now responsible for reading from and writing to the client connection, and CommandHandler is responsible for execution of command by interacting with ShardManager and WatchManager.

Currently, there is 1:1 mapping between IOThread and CommandHandler, and both of them are spawned by Resp Server. IOThread and CommandHandler communicate using 3 channels

  • ioThreadReadChan - to send command from IOThread to CommandHandler
  • ioThreadWriteChan - to send response from CommandHandler to IOThread
  • ioThreadErrChan - to send connection error signal from IOThread to CommandHandler, so that CommandHandler shuts down when IOHandler does

@psrvere psrvere changed the title Refactor - Separate out command execution responsibility to CommandHandler from IOThread Refactor - Move command execution responsibility to CommandHandler from IOThread Dec 4, 2024
@psrvere psrvere changed the title Refactor - Move command execution responsibility to CommandHandler from IOThread [DO NOT MERGE] Refactor - Move command execution responsibility to CommandHandler from IOThread Dec 5, 2024
@psrvere psrvere marked this pull request as ready for review December 5, 2024 11:39
@psrvere
Copy link
Contributor Author

psrvere commented Dec 5, 2024

@soumya-codes @lucifercr07 - please review.

@soumya-codes soumya-codes self-requested a review December 5, 2024 13:31
Copy link
Contributor

@soumya-codes soumya-codes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code changes look great Prateek. I have done a lot of nit picking around naming convention. Please feel free to open a separate PR to address them.

@@ -136,6 +136,7 @@ type performance struct {
ShardCronFrequency time.Duration `config:"shard_cron_frequency" default:"1s"`
MultiplexerPollTimeout time.Duration `config:"multiplexer_poll_timeout" default:"100ms"`
MaxClients int32 `config:"max_clients" default:"20000" validate:"min=0"`
MaxCmdHandlers int32 `config:"max_cmd_handlers" default:"20000" validate:"min=0"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this same as MaxClients?
Also since we do not plan to have more than 65536 client's can we have the type as int16 instead?

"github.com/dicedb/dice/internal/shard"
)

type Manager struct {
Copy link
Contributor

@soumya-codes soumya-codes Dec 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the behaviour, I think we got naming convention for Manager wrong in DiceDB. What do you think about Registrar/Registry?

func (m *Manager) UnregisterCommandHandler(id string) error {
m.ShardManager.UnregisterCommandHandler(id)
if cmdHandler, loaded := m.activeCmdHandlers.LoadAndDelete(id); loaded {
ch := cmdHandler.(*BaseCommandHandler)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use a comma, ok idiom here to be safe. Not needed here, but it will be failproof for future code changes.

id string
parser requestparser.Parser
shardManager *shard.ShardManager
adhocReqChan chan *cmd.DiceDBCmd
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you are refactoring this I would request you to change the name of adhocReqChan field to something more meaningful like watchReqChan.

}

type BaseCommandHandler struct {
CommandHandler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make sure the structures are aligned in a way that they add lowest memory footprint..

@@ -12,7 +12,7 @@ import (

// RespAuth returns with an encoded "OK" if the user is authenticated
// If the user is not authenticated, it returns with an encoded error message
func (t *BaseIOThread) RespAuth(args []string) interface{} {
func (h *BaseCommandHandler) RespAuth(args []string) interface{} {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest we move this to the commandhandler.go file.

const defaultRequestTimeout = 6 * time.Second

var requestCounter uint32

// IOThread interface
type IOThread interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the Handler naming convention. It follows Go naming convention, where the data-structure names are encouraged to be verbs name. Based on this should we rename IOThread to IOHandler?

@@ -21,10 +18,9 @@ var (
ErrIOThreadNotFound = errors.New("io-thread not found")
)

func NewManager(maxClients int32, sm *shard.ShardManager) *Manager {
func NewManager(maxClients int32) *Manager {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as earlier, what do you think of naming Manager to Registar/Registry? This name provides the precise nature of the work done by this type.

@@ -104,15 +104,16 @@ func (manager *ShardManager) GetShard(id ShardID) *ShardThread {
return nil
}

// RegisterIOThread registers a io-thread with all Shards present in the ShardManager.
func (manager *ShardManager) RegisterIOThread(id string, request, processing chan *ops.StoreResponse) {
// RegisterCommandHandler registers a command handler with all Shards present in the ShardManager.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as earlier, what do you think of naming Manager to Registar/Registry? This name provides the precise nature of the work done by this type.

Copy link
Contributor

@lucifercr07 lucifercr07 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, minor comments.

Comment on lines 269 to +280
func GenerateUniqueIOThreadID() string {
count := atomic.AddUint64(&ioThreadCounter, 1)
timestamp := time.Now().UnixNano()/int64(time.Millisecond) - startTime
return fmt.Sprintf("W-%d-%d", timestamp, count)
}

func GenerateUniqueCommandHandlerID() string {
count := atomic.AddUint64(&cmdHandlerCounter, 1)
timestamp := time.Now().UnixNano()/int64(time.Millisecond) - startTime
return fmt.Sprintf("W-%d-%d", timestamp, count)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we optimise this as below? Also any reasons why we're keeping ID format for cmdHandler and IOThread same?

func GenerateUniqueID(prefix string, counter *uint64) string {
	count := atomic.AddUint64(counter, 1)
	timestamp := time.Now().UnixMilli() - startTime
	return fmt.Sprintf("%s-%d-%d", prefix, timestamp, count)
}

func GenerateUniqueIOThreadID() string {
	return GenerateUniqueID("W", &ioThreadCounter)
}

func GenerateUniqueCommandHandlerID() string {
	return GenerateUniqueID("W", &cmdHandlerCounter)
}

m.mu.Lock()
defer m.mu.Unlock()

if m.CommandHandlerCount() >= m.maxCmdHandlers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this returns error we've already registered an iothread, may be we can move this check earlier?
Also this'd initiate a server shutdown I believe would that be okay? Shouldn't we just drop the new client gracefully and continue server ops?


type Manager struct {
activeCmdHandlers sync.Map
numCmdHandlers atomic.Int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use atomic.Uint32 instead?

type Manager struct {
activeCmdHandlers sync.Map
numCmdHandlers atomic.Int32
maxCmdHandlers int32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above uint32?

Comment on lines +42 to +46
if responseChan != nil && preprocessingChan != nil {
m.ShardManager.RegisterCommandHandler(cmdHandler.ID(), responseChan, preprocessingChan) // TODO: Change responseChan type to ShardResponse
} else if responseChan != nil && preprocessingChan == nil {
m.ShardManager.RegisterCommandHandler(cmdHandler.ID(), responseChan, nil)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we change it to?

m.ShardManager.RegisterCommandHandler(cmdHandler.ID(),cmdHandler.responseChan,cmdHandler.preprocessingChan)

Comment on lines +39 to +40
responseChan := cmdHandler.responseChan
preprocessingChan := cmdHandler.preprocessingChan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May be we can have checks in handler registration to mandate responseChan presence?

Copy link
Contributor

@lucifercr07 lucifercr07 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, minor comments.

@@ -212,10 +213,12 @@ func RunTestServer(wg *sync.WaitGroup, opt TestServerOptions) {
cmdWatchSubscriptionChan := make(chan watchmanager.WatchSubscription)
gec := make(chan error)
shardManager := shard.NewShardManager(1, queryWatchChan, cmdWatchChan, gec)
ioThreadManager := iothread.NewManager(20000, shardManager)
ioThreadManager := iothread.NewManager(20000)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use a config for this number.


// decomposeCommand is a function that takes a DiceDB command and breaks it down into smaller,
// manageable DiceDB commands for each shard processing. It returns a slice of DiceDB commands.
decomposeCommand func(ctx context.Context, thread *BaseIOThread, DiceDBCmd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error)
decomposeCommand func(ctx context.Context, h *BaseCommandHandler, DiceDBCmd *cmd.DiceDBCmd) ([]*cmd.DiceDBCmd, error)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think these methods could just be defined on the BaseCommandHandler? We're passing a pointer to the command handler in each of these methods and I'm wondering if we could avoid that.

@@ -54,12 +54,12 @@ func preProcessRename(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
// preProcessCopy prepares the COPY command for preprocessing by sending a GET command
// to retrieve the value of the original key. The retrieved value is used later in the
// decomposeCopy function to copy the value to the destination key.
func customProcessCopy(thread *BaseIOThread, diceDBCmd *cmd.DiceDBCmd) error {
func customProcessCopy(h *BaseCommandHandler, diceDBCmd *cmd.DiceDBCmd) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the other comment, is it possible to define these methods on the command handler struct?

Comment on lines +44 to +46
} else if responseChan != nil && preprocessingChan == nil {
m.ShardManager.RegisterCommandHandler(cmdHandler.ID(), responseChan, nil)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this can be placed before this if else condition and each of the two channels can be checked and initialized separately. It would avoid errors where either one of the two channels is somehow uninitialized and doesn't fit into the existing if-else logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants